{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This benchmark measures the performance of run related operations in Kubeflow pipelines, including run durations and latencies of creating/getting/deleting runs.\n",
    "\n",
    "import random\n",
    "import kfp\n",
    "import kfp_server_api\n",
    "import os\n",
    "import string\n",
    "import time\n",
    "from google.cloud import storage\n",
    "from kfp.components import create_component_from_func\n",
    "from datetime import datetime, timezone, timedelta\n",
    "\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import seaborn as sns\n",
    "import matplotlib.pyplot as plt\n",
    "from scipy import stats\n",
    "\n",
    "# CHANGE necessary paramters here\n",
    "# host is your KFP endpoint\n",
    "host = 'http://127.0.0.1:3001'\n",
    "# Use the pipeline you prefer\n",
    "pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'\n",
    "# number of runs you want to create\n",
    "num_runs = 5\n",
    "# Periodically check whether the runs have been finished.\n",
    "run_status_polling_interval_sec = 60\n",
    "\n",
    "\n",
    "def random_suffix() -> string:\n",
    "    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))\n",
    "\n",
    "def run_finished(run_status: string) -> bool:\n",
    "    return run_status in {'Succeeded', 'Failed', 'Error', 'Skipped', 'Terminated'}\n",
    "\n",
    "def run_succeeded(run_status: string) -> bool:\n",
    "    return run_status in {'Succeeded'}\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    client = kfp.Client(host)\n",
    "    \n",
    "    # Create a pipeline and we'll use its default version to create runs.\n",
    "    api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)\n",
    "    api_pipeline = kfp_server_api.models.ApiPipeline(\n",
    "        name='pipeline-' + random_suffix(),\n",
    "        url=api_url)\n",
    "    pipeline = client.pipelines.create_pipeline(body=api_pipeline)\n",
    "    default_version_id = pipeline.default_version.id\n",
    "\n",
    "    # Create an experiment.\n",
    "    experiment_name = 'experiment-' + random_suffix()\n",
    "    experiment = client.experiments.create_experiment(body={'name' : experiment_name})\n",
    "    experiment_id = experiment.id\n",
    "    \n",
    "    # Measure create run latency. Note this time is the roundrip latency of CreateRun method. The actual run is \n",
    "    # not finished when client side gets the CreateRun response. Run duration will be measured below when run is \n",
    "    # actually finished.\n",
    "    created_runs = []\n",
    "    create_run_latencies = []\n",
    "    for i in range(num_runs):\n",
    "        resource_references = []\n",
    "        key = kfp_server_api.models.ApiResourceKey(id=experiment_id, type=kfp_server_api.models.ApiResourceType.EXPERIMENT)\n",
    "        reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)\n",
    "        resource_references.append(reference)\n",
    "        key = kfp_server_api.models.ApiResourceKey(id=default_version_id, type=kfp_server_api.models.ApiResourceType.PIPELINE_VERSION)\n",
    "        reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.CREATOR)\n",
    "        resource_references.append(reference)\n",
    "        # If the pipeline you choose needs to specify parameters to create a run, specify it here.\n",
    "        parameters = []\n",
    "        parameter = kfp_server_api.ApiParameter(name='pipeline-root', value='gs://jingzhangjz-project-outputs/tfx_taxi_simple/{{workflow.uid}}')\n",
    "        parameters.append(parameter)\n",
    "        parameter = kfp_server_api.ApiParameter(name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data')\n",
    "        parameters.append(parameter)\n",
    "        parameter = kfp_server_api.ApiParameter(name='module-file', value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py')\n",
    "        parameters.append(parameter)        \n",
    "        pipeline_spec = kfp_server_api.ApiPipelineSpec(parameters=parameters)\n",
    "\n",
    "        start = time.perf_counter()\n",
    "        run_name = 'run-' + random_suffix()\n",
    "        run = client.runs.create_run(body={'name':run_name, 'resource_references': resource_references, 'pipeline_spec': pipeline_spec})  \n",
    "        dur = time.perf_counter() - start\n",
    "        create_run_latencies.append(dur)   \n",
    "        created_runs.append(run.run.id)\n",
    "               \n",
    "    # Wait for the runs to finish. \n",
    "    # TODO(jingzhang36): We can add a timeout for this polling. For now we rely on the timeout of runs in KFP. \n",
    "    while True:\n",
    "        all_finished = True\n",
    "        for i in created_runs:\n",
    "            run = client.runs.get_run(i)  \n",
    "            if not run_finished(run.run.status):\n",
    "                all_finished = False\n",
    "                break\n",
    "        if all_finished:     \n",
    "            break\n",
    "        else:\n",
    "            time.sleep(run_status_polling_interval_sec)\n",
    "\n",
    "    # When all runs are finished, measure run durations and get run latencies.\n",
    "    get_run_latencies = []\n",
    "    succeeded_run_durations = []\n",
    "    run_results = []\n",
    "    for i in created_runs:\n",
    "        start = time.perf_counter()\n",
    "        run = client.runs.get_run(i)  \n",
    "        dur = time.perf_counter() - start\n",
    "        get_run_latencies.append(dur) \n",
    "        if run_succeeded(run.run.status):\n",
    "            run_results.append('succeeded')\n",
    "            succeeded_run_durations.append((run.run.finished_at - run.run.created_at).total_seconds())\n",
    "        else:\n",
    "            run_results.append('not_succeeded')\n",
    "\n",
    "    # Measure delete run latency.\n",
    "    delete_run_latencies = []\n",
    "    for i in created_runs:\n",
    "        start = time.perf_counter()\n",
    "        run = client.runs.delete_run(i)  \n",
    "        dur = time.perf_counter() - start\n",
    "        delete_run_latencies.append(dur)   \n",
    "    \n",
    "    # Cleanup\n",
    "    client.pipelines.delete_pipeline(pipeline.id)\n",
    "    client.experiments.delete_experiment(experiment.id)\n",
    "         \n",
    "    # Plots\n",
    "    fig, axs = plt.subplots(nrows=4, figsize=(10,20))\n",
    "    \n",
    "    axs[0].set(title='Create Run Latency', xlabel='Time (Second)', ylabel='Create')\n",
    "    sns.distplot(a=create_run_latencies,  ax=axs[0], hist=True, kde=False, rug=True)\n",
    "    \n",
    "    axs[1].set(title='Run Durations', xlabel='Time (Second)', ylabel='Run')\n",
    "    sns.distplot(a=succeeded_run_durations,  ax=axs[1], hist=True, kde=False, rug=True)    \n",
    "    \n",
    "    axs[2].set(title='Get Run Latency', xlabel='Time (Second)', ylabel='Get')\n",
    "    sns.distplot(a=get_run_latencies,  ax=axs[2], hist=True, kde=False, rug=True)    \n",
    "    \n",
    "    axs[3].set(title='Delete Run Latency', xlabel='Time (Second)', ylabel='Delete')\n",
    "    sns.distplot(a=delete_run_latencies,  ax=axs[3], hist=True, kde=False, rug=True)\n",
    "\n",
    "    loaded_run_results = pd.DataFrame(np.array(run_results), columns=['result'])\n",
    "    sns.catplot(x='result', kind=\"count\", data=loaded_run_results)\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "file_extension": ".py",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  },
  "mimetype": "text/x-python",
  "name": "python",
  "npconvert_exporter": "python",
  "pygments_lexer": "ipython3",
  "version": 3
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
